AQS同步机制源码分析(一)

AQS 即AbStractQueuedSynchronizer 抽象队列同步器,他是一个并发同步器框架,许多锁的实现正是基于AQS实现的,比如ReentrantLock,Semaphore,ReadWriteLock等等都是,可以说这是java并发框架类最为重要的一个类。

它的继承结构如下:

1
2
3
public abstract class AbstractQueuedSynchronizer
extends AbstractOwnableSynchronizer
implements java.io.Serializable

可以看到它是一个抽象类,继承自AbstractOwnableSynchronizer,它是一种再独占模式中的抽象独占同步器。

AQS的为我们提供了统一的实现锁的框架,它实现了如下基本功能:

  1. 获取同步状态
    如果允许,则获取锁,如果不允许就阻塞线程,直到同步状态允许获取。
  2. 释放同步状态
    修改同步状态,并且唤醒等待线程

同时AQS同步机制同时满足了如下需求:

  1. 独占锁和共享锁两种机制
  2. 线程阻塞后,如果需要取消,需要支持中断
  3. 线程阻塞后,如果有超时要求,应该支持超时中断机制

为了实现AQS的基本功能,其内部维护了一个同步状态state(通过volatile和CAS来保证其原子性和可见性)和一个CLH FIFO队列(非阻塞队列),队列的节点Node代表了线程.

Node节点的结构如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
static final class Node {  
       /** 线程已被取消 */  
      static final int CANCELLED =  1;  
              
      /** 当前线程的后继线程需要被unpark(唤醒) */  
      static final int SIGNAL    = -1;  
              
      /** 线程(处在Condition休眠状态)在等待Condition唤醒 */  
      static final int CONDITION = -2;  
              
     /** 共享锁 */  
      static final Node SHARED = new Node();  
      /** 独占锁  */  
      static final Node EXCLUSIVE = null;  
      volatile int waitStatus;  
  
      /** 前继节点 */  
      volatile Node prev;  
  
      /** 后继节点 */  
      volatile Node next;  
              
      volatile Thread thread;  
  
      Node nextWaiter;  
  
      final boolean isShared() {  
          return nextWaiter == SHARED;  
      }  
  
      /** 获取前继节点 */  
      final Node predecessor() throws NullPointerException {  
           Node p = prev;  
           if (p == null)  
               throw new NullPointerException();  
          else  
              return p;  
      }  
  
     /* 构造函数*/  
      Node() {}  
  
      Node(Thread thread, Node mode) {     
          this.nextWaiter = mode;  
          this.thread = thread;  
      }  
  
      Node(Thread thread, int waitStatus) {   
          this.waitStatus = waitStatus;  
          this.thread = thread;  
      }  
}

基于AQS的实现有两种锁的模式,分别为独占锁和共享锁。基于这两种模式我们分别讨论同步状态的获取和释放。

独占模式

AQS的独占锁在获取锁时会通过acquire来获取同步状态,我们看看这个方法

独占:

1
2
3
4
5
6
7
8
9
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}

protected boolean tryAcquire(int arg) {
throw new UnsupportedOperationException();
}

独占可中断:

1
2
3
4
5
6
7
public final void acquireInterruptibly(int arg)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
if (!tryAcquire(arg))
doAcquireInterruptibly(arg);
}

独占可超时:

1
2
3
4
5
6
7
public final boolean tryAcquireNanos(int arg, long nanosTimeout)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
return tryAcquire(arg) ||
doAcquireNanos(arg, nanosTimeout);
}

tryAcquire在AQS的未实现,它是由子类实现的,返回true,表示成功,线程继续,否则失败,阻塞线程并加入队列。

1
2
3
4
5
6
7
8
9
10
11
12
13
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}

protected boolean tryRelease(int arg) {
throw new UnsupportedOperationException();
}

释放成功会唤醒后续节点,这里tryRelease同样在AQS中未实现,需要在子类中处理。

共享模式

共享:

1
2
3
4
public final void acquireShared(int arg) {
if (tryAcquireShared(arg) < 0)
doAcquireShared(arg);
}

共享可中断:

1
2
3
4
5
6
7
public final void acquireSharedInterruptibly(int arg)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
if (tryAcquireShared(arg) < 0)
doAcquireSharedInterruptibly(arg);
}

共享可超时:

1
2
3
4
5
6
7
8
9
10
11
public final boolean tryAcquireSharedNanos(int arg, long nanosTimeout)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
return tryAcquireShared(arg) >= 0 ||
doAcquireSharedNanos(arg, nanosTimeout);
}

protected int tryAcquireShared(int arg) {
throw new UnsupportedOperationException();
}

同tryAcquire类似,tryAcquireShared在AQS中未实现,它同样是由子类实现同步状态的更新获取逻辑的。

1
2
3
4
5
6
7
8
9
10
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}
protected int tryAcquireShared(int arg) {
throw new UnsupportedOperationException();
}

可以看出,AQS通过一些tryAcquirexx tryReleasexx方法提供了同步器的实现框架,这些方法主要是来更新或者获取state同步状态的,而AQS基于其返回值决定是否阻塞执行线程并加入到其队列中。

使用场景

在竞争激烈的多线程环境下,使用非公平锁可以很大程度上提高系统运行的效率,如果一个线程持有锁时间过长那么每次获取非公平锁需要额外的一次CAS操作。

总结

AQS是通过volatile结合CAS实现抽象并发同步器,这种方式是java concurrent包实现的基础,一个通用化的实现模式:

  1. 首先,声明共享变量为volatile。
  2. 然后,使用CAS的原子条件更新来实现线程之间的同步。
  3. 同时,配合以volatile的读/写和所具有的volatile读和写的内存语义来实现线程之间的通信。
    AQS内部通过一个非阻塞式的FIFO队列和volatile变量state来实现,FIFO队列和state都是通过CAS来实现同步。在内存语义上CAS具有volatile读和volatile写一样的效果,同时CAS保证了共享变量的原子性。
坚持原创技术分享,您的支持将鼓励我继续创作!